package cn.uncode.rpc.transport.netty;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import cn.uncode.rpc.common.ChannelState;
import cn.uncode.rpc.common.CommonConstant;
import cn.uncode.rpc.common.log.Logger;
import cn.uncode.rpc.common.log.LoggerFactory;
import cn.uncode.rpc.core.Request;
import cn.uncode.rpc.core.Response;
import cn.uncode.rpc.core.URL;
import cn.uncode.rpc.core.URLParam;
import cn.uncode.rpc.exception.TransportException;
import cn.uncode.rpc.stats.StatisticCallback;
import cn.uncode.rpc.transport.support.AbstractClient;
import cn.uncode.rpc.util.FrameworkUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;




/**
 * 
 * <pre>
 * 		netty client 相关
 * 			1)  timeout 设置 （connecttimeout，sotimeout, application timeout）
 * 			2） 线程池设置
 *  		3） 最大连接池设置
 * 			4） 最大消息队列设置 (netty channel内部: writeQueue)
 * 			5） 最大返回数据包设置
 * 			6） RPC 的测试的时候，需要非常关注 OOM的问题
 * </pre>
 * 
 * @author maijunsheng
 * @version 创建时间：2013-5-31
 * 
 */
public class NettyClient extends AbstractClient implements StatisticCallback {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
	
	private Bootstrap bootstrap = null;

	private final EventLoopGroup eventLoopGroupWorker;
	private DefaultEventExecutorGroup defaultEventExecutorGroup;
	private Channel channel = null;
	
	// 回收过期任务
	private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4);

	// 异步的request，需要注册callback future
	// 触发remove的操作有： 1) service的返回结果处理。 2) timeout thread cancel
	protected ConcurrentMap<Long, NettyResponseFuture> callbackMap = new ConcurrentHashMap<Long, NettyResponseFuture>();

	private ScheduledFuture<?> timeMonitorFuture = null;


	// 连续失败次数
	private AtomicLong errorCount = new AtomicLong(0);
	// 最大连接数
	private int maxClientConnection = 0;

	public NettyClient(URL url) {
		super(url);

		maxClientConnection = url.getIntParameter(URLParam.MAX_CLIENT_CONNECTION.getName(),
				URLParam.MAX_CLIENT_CONNECTION.getIntValue());

		timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(
				new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()),
				CommonConstant.NETTY_TIMEOUT_TIMER_PERIOD, CommonConstant.NETTY_TIMEOUT_TIMER_PERIOD,
				TimeUnit.MILLISECONDS);
		this.eventLoopGroupWorker = new NioEventLoopGroup();
		this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(4);
	}


	@Override
	public void heartbeat(Request request) {
		// 如果节点还没有初始化或者节点已经被close掉了，那么heartbeat也不需要进行了
		if (state.isUnInitState() || state.isCloseState()) {
			LOGGER.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri());
			return;
		}

		LOGGER.info("NettyClient heartbeat request: url={}", url.getUri());

		try {
			// async request后，如果service is
			// available，那么将会自动把该client设置成可用
			send(request);
		} catch (Exception e) {
			LOGGER.error("NettyClient heartbeat Error: url=" + url.getUri(), e);
		}
	}

	/**
	 * 请求remote service
	 * 
	 * <pre>
	 * 		1)  get connection from pool
	 * 		2)  async requset
	 * 		3)  return connection to pool
	 * 		4)  check if async return response, true: return ResponseFuture;  false: return result
	 * </pre>
	 * 
	 * @param request
	 * @param async
	 * @return
	 * @throws TransportException
	 */
	public Response send(Request request) throws TransportException {

		Response response = null;
		if (!isAvailable()) {
			throw new TransportException("NettyChannel is unavaliable: url=" + url.getUri()
					+ FrameworkUtil.toString(request));
		}
		
		boolean async = url.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), 
				URLParam.ASYNC.getName(), URLParam.ASYNC.getBooleanValue());
			
		if (channel == null) {
			channel = initClientBootstrap();
		}
		
		if (channel == null) {
			LOGGER.error("NettyClient borrowObject null: url=" + url.getUri() + " " + FrameworkUtil.toString(request));
			return null;
		}
		
		int timeout = getUrl().getIntParameter(URLParam.REQUEST_TIMEOUT.getName(), URLParam.REQUEST_TIMEOUT.getIntValue());
		final NettyResponseFuture responseFuture = new NettyResponseFuture(request, timeout);
		registerCallback(request.getRequestId(), responseFuture);
		
			if(channel != null){
				this.channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
					@Override
					public void operationComplete(ChannelFuture future) throws Exception {
						if (future.isSuccess()) {
//							responseFuture.setIsOk(true);
							return;
						} else {
//							responseFuture.setIsOk(false);
						}

						// 写入失败了,就从缓存中移掉这个请求
						callbackMap.remove(responseFuture.getRequest().getRequestId(), responseFuture);

//						responseFuture.setCause(future.cause());
//						responseFuture.setResponse(null);
						LOGGER.warn("send a request to channel <{}> failed.\nREQ:{}", future.channel(), request);
					}
				});

				try {
					response = responseFuture.waitResponse(10, TimeUnit.SECONDS);
				} catch (InterruptedException e) {
					// ignore e;
				}
				if (null == response) {
//					if (responseFuture.isOk()) {
//						throw new TimeoutException(
//								String.format("wait response on the channel <%s> timeout 10 (s).", this.channel),
//								responseFuture.getCause()
//						);
//					} else {
//						throw new SendRequestException(
//								String.format("send request to the channel <%s> failed.", this.channel),
//								responseFuture.getCause());
//					}
				} else {
					//LOGGER.debug(String.format("send a request to channel <%s> success.\nREQ:%s\nRES:%s", this.channel, request, response));
				}
			}
			return response;
			
	}


	@Override
	public boolean connect() throws TransportException{
		if (isAvailable()) {
			return true;
		}
		// 初始化netty client bootstrap
		channel = initClientBootstrap();
		LOGGER.info("NettyClient finish Open: url={}", url);
		// 注册统计回调
//		StatsUtil.registryStatisticCallback(this);
		
		// 设置可用状态
		state = ChannelState.ALIVE;
		return state.isAliveState();
	}

	/**
	 * 初始化 netty clientBootstrap
	 */
	private Channel initClientBootstrap() {
		// 实际上，极端情况下，connectTimeout会达到500ms，因为netty nio的实现中，是依赖BossThread来控制超时，
		// 如果为了严格意义的timeout，那么需要应用端进行控制。
		int timeout = getUrl().getIntParameter(URLParam.REQUEST_TIMEOUT.getName(),
				URLParam.REQUEST_TIMEOUT.getIntValue());
		if (timeout <= 0) {
			throw new TransportException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.");
		}
		Bootstrap bootsp = new Bootstrap();
		// 最大响应包限制
		final int maxContentLength = url.getIntParameter(URLParam.MAX_CONTENT_LENGTH.getName(),
				URLParam.MAX_CONTENT_LENGTH.getIntValue());

		bootstrap = bootsp.group(this.eventLoopGroupWorker)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.option(ChannelOption.SO_KEEPALIVE, true)
				.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline().addLast(defaultEventExecutorGroup,
								// new LoggingHandler("client", LogLevel.INFO),
								new NettyDecoder(codec, maxContentLength), 
								new NettyEncoder(codec), 
								new NettyClientHandler());
					}
				});
		 return getChannel();
	}
	
	private Channel getChannel() {
		ChannelFuture channelFuture = bootstrap.bind(url.getHost(), url.getPort());
		try {
			channelFuture.sync();
			LOGGER.info("connect {}:{} ok.", url.getHost(), url.getPort());
		} catch (InterruptedException e) {
			throw new TransportException(e.getMessage(), e);
		}
		return channelFuture.channel();
    }
	
	class NettyClientHandler extends SimpleChannelInboundHandler<Response> {
		@Override
		protected void channelRead0(ChannelHandlerContext ctx, Response response) throws Exception {
			long id = response.getRequestId();
			NettyResponseFuture responseFuture = callbackMap.remove(id);
			if (responseFuture != null) {
				//LOGGER.debug("receive request id:{} response data.", id);
				if (response.getException() != null) {
					responseFuture.onFailure(response);
				} else {
					responseFuture.onSuccess(response);
				}
			} else {
				LOGGER.warn("NettyClient has response from server, but resonseFuture not exist,  requestId={}", response.getRequestId());
			}
		}
	}
	
	
	
	

	@Override
	public synchronized void close() {
		if (state.isCloseState()) {
			LOGGER.info("NettyClient close fail: already close, url={}", url.getUri());
			return;
		}

		// 如果当前nettyClient还没有初始化，那么就没有close的理由。
		if (state.isUnInitState()) {
			LOGGER.info("NettyClient close Fail: don't need to close because node is unInit state: url={}",
					url.getUri());
			return;
		}

		try {
			// 取消定期的回收任务
			timeMonitorFuture.cancel(true);
			// 清空callback
			callbackMap.clear();

			// 设置close状态
			state = ChannelState.CLOSE;
			// 解除统计回调的注册
//			StatsUtil.unRegistryStatisticCallback(this);
			LOGGER.info("NettyClient close Success: url={}", url.getUri());
		} catch (Exception e) {
			LOGGER.error("NettyClient close Error: url=" + url.getUri(), e);
		}
	}


	@Override
	public boolean isClosed() {
		return state.isCloseState();
	}

	@Override
	public boolean isAvailable() {
		return state.isAliveState();
	}

	@Override
	public URL getUrl() {
		return url;
	}


	/**
	 * 增加调用失败的次数：
	 * 
	 * <pre>
	 * 	 	如果连续失败的次数 >= maxClientConnection, 那么把client设置成不可用状态
	 * </pre>
	 * 
	 */
	void incrErrorCount() {
		long count = errorCount.incrementAndGet();

		// 如果节点是可用状态，同时当前连续失败的次数超过限制maxClientConnection次，那么把该节点标示为不可用
		if (count >= maxClientConnection && state.isAliveState()) {
			synchronized (this) {
				count = errorCount.longValue();

				if (count >= maxClientConnection && state.isAliveState()) {
					LOGGER.error("NettyClient unavailable Error: url=" + url.getIdentity() + " "
							+ url.getServerPortStr());
					state = ChannelState.UNALIVE;
				}
			}
		}
	}

	/**
	 * 重置调用失败的计数 ：
	 * 
	 * <pre>
	 * 把节点设置成可用
	 * </pre>
	 * 
	 */
	void resetErrorCount() {
		errorCount.set(0);

		if (state.isAliveState()) {
			return;
		}

		synchronized (this) {
			if (state.isAliveState()) {
				return;
			}

			// 如果节点是unalive才进行设置，而如果是 close 或者 uninit，那么直接忽略
			if (state.isUnAliveState()) {
				long count = errorCount.longValue();

				// 过程中有其他并发更新errorCount的，因此这里需要进行一次判断
				if (count < maxClientConnection) {
					state = ChannelState.ALIVE;
					LOGGER.info("NettyClient recover available: url=" + url.getIdentity() + " "
							+ url.getServerPortStr());
				}
			}
		}
	}

	/**
	 * 注册回调的resposne
	 * 
	 * <pre>
	 * 
	 * 		进行最大的请求并发数的控制，如果超过NETTY_CLIENT_MAX_REQUEST的话，那么throw reject exception
	 * 
	 * </pre>
	 * 
	 * @throws MotanServiceException
	 * @param requestId
	 * @param nettyResponseFuture
	 */
	public void registerCallback(long requestId, NettyResponseFuture nettyResponseFuture) {
		if (this.callbackMap.size() >= CommonConstant.NETTY_CLIENT_MAX_REQUEST) {
			// reject request, prevent from OutOfMemoryError
			throw new TransportException("NettyClient over of max concurrent request, drop request, url: "
					+ url.getUri() + " requestId=" + requestId);
		}

		this.callbackMap.put(requestId, nettyResponseFuture);
	}

	/**
	 * 统计回调接口
	 */
	@Override
	public String statisticCallback() {
		//避免消息泛滥，如果节点是可用状态，并且堆积的请求不超过100的话，那么就不记录log了
		if (isAvailable() && callbackMap.size() < 100) {
			return null;
		}

		return String.format("identity: %s available: %s concurrent_count: %s", url.getIdentity(), isAvailable(),
				callbackMap.size());
	}

	/**
	 * 移除回调的response
	 * 
	 * @param requestId
	 * @return
	 */
	public NettyResponseFuture removeCallback(long requestId) {
		return callbackMap.remove(requestId);
	}


	/**
	 * 回收超时任务
	 * 
	 * @author maijunsheng
	 * 
	 */
	class TimeoutMonitor implements Runnable {
		private String name;

		public TimeoutMonitor(String name) {
			this.name = name;
		}

		public void run() {

			long currentTime = System.currentTimeMillis();

			for (Map.Entry<Long, NettyResponseFuture> entry : callbackMap.entrySet()) {
				try {
					NettyResponseFuture future = entry.getValue();

					if (future.getCreateTime() + future.getTimeout() < currentTime) {
						// timeout: remove from callback list, and then cancel
						removeCallback(entry.getKey());
						future.cancel();
					} 
				} catch (Exception e) {
					LOGGER.error(
							name + " clear timeout future Error: uri=" + url.getUri() + " requestId=" + entry.getKey(),
							e);
				}
			}
		}
	}

	@Override
	public boolean reConnect() throws TransportException {
		// TODO Auto-generated method stub
		return false;
	}

}
